import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
# Visualizations will be shown in the notebook.
%matplotlib inline
training_file = "dataset/train.p"
validation_file = "dataset/valid.p"
testing_file = "dataset/test.p"
with open(training_file, mode='rb') as f:
train = pickle.load(f)
with open(validation_file, mode='rb') as f:
valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
test = pickle.load(f)
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']
n_train = X_train.shape[0]
n_validation = X_valid.shape[0]
n_test = X_test.shape[0]
image_shape = (X_train.shape[1:])
n_classes = len(set(y_train))
print("Number of training examples =", n_train)
print("Number of validation examples =", n_validation)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)
All training/valid/test files are already packaged in pickle files. After unpacking, the number of training examples are 3 times more than the testing examples. We save more than 4,410 images for validation and tuning the hyperparameters.
Image size is (32, 32, 3) meaning the image width x height is 32 x 32. And 3 RGB channels.
We have 43 unique traffic sign labels as given from the signnames.csv file. As shown below. Notice images have varied degree of brightness and contrasts which we have to fix.
sign_names = list(pd.read_csv("dataset/signnames.csv")['SignName'])
import random
def display_images(Xd, yd, labels_names, num_display=10, squeeze=False, cmap=None):
samples_per_class = np.bincount(yd)
for i, label in enumerate(labels_names):
print("Index {0}. Sign name: {1}. Number of signs: {2}".format(i, label, samples_per_class[i]))
samples = random.sample(list(np.where(yd==i)[0]), num_display)
fig = plt.figure(figsize=(num_display, 1))
fig.subplots_adjust(hspace=0, wspace=0)
for j, idx in enumerate(samples):
img = Xd[idx]
axis = fig.add_subplot(1, num_display, j+1, xticks=[], yticks=[])
if squeeze and cmap:
axis.imshow(img.squeeze(), cmap=cmap)
else:
axis.imshow(img)
plt.show()
display_images(X_train, y_train, sign_names)
Let's plot the sample distribution among the 43 classes. As clearly shown, most classes are under-sampled with less than 1,000 training images. We can't perform training on this dataset because the model might fail to recognize traffic signs that have few training examples.
plt.figure(figsize=(15, 5))
plt.plot(np.bincount(y_train))
plt.hist(y_train, bins=n_classes)
plt.title('Before Augmentation')
plt.show()
Our goal is to utilize image-transformation techniques to augment the datasets and balance the sampling distribution.
For the class ImageEffect, we have the following methods:
randomize() will generate a random boolean arrays to select x tranformations out of 5 choicestranslate() will generate M-matrix based on x, y-shift and used cv2.warpAffine()rotate() will generate M-matrix in the cv2.warpAffine() from the output of cv2.getRotationMatrix2D((x/2, y/2), rotate_degree, scale)blur() will use cv2.GaussianBlur()gamma() will brighten up the image based on lookup table as np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8") and cv2.LUT(image, table). The formula is $[\frac{i}{255}^\frac{1}{\gamma} * 255]$import cv2
from itertools import compress
class ImageEffect:
"""A combinations of image manipulation methods to augment dataset.
The effects include: translation, rotation, shear, blur, gamma.
Source: @eqbal, https://li8bot.wordpress.com/2014/07/21/opencvpythonpart3-geometric-transformations-2/
Attributes:
image: feed in the image that can be transformed
"""
def randomize(self, image, show=False):
"""Randomize the effects on the input image
"""
lottery = np.random.randint(0, 2, [5]).astype('bool')
types = ["Translate", "Rotate", "Shear", "Blur", "Gamma"]
if lottery[0]:
image = self.translate(image)
if lottery[1]:
image = self.rotate(image)
if lottery[2]:
image = self.shear(image)
if lottery[3]:
image = self.blur(image)
if lottery[4]:
image = self.gamma(image)
return list(compress(types, lottery)), image
def translate(self, image, show=False):
"""Translate
"""
y, x, channel = image.shape
# Shift along the y-axis up to 30%. Shift along the x-axis up to 30%
y_shift, x_shift = np.random.uniform(-0.3*y, 0.3*y), np.random.uniform(-0.3*x, 0.3*x)
if show:
print("y-axis:", y_shift, "x-axis:", x_shift)
M = np.float32([[1, 0, x_shift], [0, 1, y_shift]])
dst = cv2.warpAffine(image, M, (x, y))
return dst
def rotate(self, image, show=False, rotate_degree=90):
"""Rotate
"""
y, x, channel = image.shape
# Rotate it by 90 degrees anticlockwise. Scale factor = 1, similar to original
# (x/2, y/2) : center of the rotation in the source image
M = cv2.getRotationMatrix2D((x/2, y/2), rotate_degree, 1)
if show:
print(M)
dst = cv2.warpAffine(image, M, (x, y))
return dst
def shear(self, image):
"""Shear. Keep parallel lines together.
"""
y, x, channel = image.shape
pts1 = np.float32([[5,5],[20,5],[5,20]])
shear = np.random.randint(5, 15)
target1 = 5 + shear * np.random.uniform() - shear/2
target2 = 20 + shear * np.random.uniform() - shear/2
pts2 = np.float32([[target1,5],[target2,target1],[5,target2]])
# Calculates an affine transform from three pairs of the corresponding points
# src – Coordinates of triangle vertices in the source image.
# dst – Coordinates of the corresponding triangle vertices in the destination image.
M = cv2.getAffineTransform(pts1, pts2)
dst = cv2.warpAffine(image, M, (x, y))
return dst
def blur(self, image):
"""Blur/Normalize
"""
dst = cv2.GaussianBlur(image, (5, 5), 0)
return dst
def gamma(self, image, gamma=1.5, show=False):
"""Gamma Correction. Brighten the image.
Source : https://www.pyimagesearch.com/2015/10/05/opencv-gamma-correction/
"""
if np.mean(image) < 50:
gamma *= 3 # Triple gamma correction for dark images
if show:
print(np.mean(image), gamma)
# Build a lookup table mapping the pixel values [0, 255] to their adjusted gamma values
# All we need to do is scale the pixel intensities to the range [0, 1.0], apply the transform, and then scale back to the range [0, 255]
invGamma = 1.0 / gamma
table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
# Apply gamma correction using the lookup table
dst = cv2.LUT(image, table)
return dst
Let's test the effects on 5 random images
transform = ImageEffect()
for _ in range(5):
samp = X_train[np.random.randint(n_train)]
dst_translate = transform.translate(samp)
dst_rotate = transform.rotate(samp, rotate_degree=180)
dst_shear = transform.shear(samp)
dst_gamma = transform.gamma(samp)
dst_gb = transform.blur(samp)
random_types, dst_random = transform.randomize(samp)
plt.figure(figsize=(15, 15))
plt.subplot(171), plt.imshow(samp), plt.axis('off'), plt.title('Input')
plt.subplot(172), plt.imshow(dst_translate), plt.title('Translate')
plt.subplot(173), plt.imshow(dst_rotate), plt.title('Rotate')
plt.subplot(174), plt.imshow(dst_shear), plt.title('Shear')
plt.subplot(175), plt.imshow(dst_gamma), plt.title('Gamma')
plt.subplot(176), plt.imshow(dst_gb), plt.title('Blur')
plt.subplot(177), plt.imshow(dst_random), plt.title(random_types)
plt.show()
Now that we have the augmentation techniques, to balance the training distribution, we consider the mean number of images across all classes. If the class is over-sampled, we won't augment. If the class is under-sampled, for each image in that class, we create scale_factor * avg/len(images) "fake images".
def augment_dataset(Xd, yd, scale_factor, shuffle=False, show=False):
"""Augment training dataset. Inspried from @eqbal
"""
n_classes = len(set(yd))
class_idxs, images_per_class = np.unique(yd, return_counts=True)
avg = np.ceil(np.mean(images_per_class, axis=0)).astype('uint32')
if show:
print("Average = ", avg)
# Separate images into classes
sep_data = []
for idx in class_idxs:
sep_data.append(Xd[yd==idx])
expanded_Xd = np.array(np.zeros((1, 32, 32, 3)))
expanded_yd = np.array([0])
# For each class of image, calculate the number of new_images to add.
# Then for each image, generate that many pseudo images
transform = ImageEffect()
for idx, orig_images in enumerate(sep_data):
new_images = []
factor = (scale_factor * (avg / len(orig_images))).astype('uint32')
for img in orig_images:
for _ in range(factor):
new_images.append(transform.randomize(img)[1])
if show:
print("For class idx = {0}, factor = {1}, number of original images = {2}, number of new images added = {3}".format(idx, factor, len(orig_images), len(new_images)))
if len(new_images) > 0:
orig_images = np.concatenate((orig_images, new_images), axis=0)
new_labels = np.full(len(orig_images), idx, dtype='uint8')
expanded_Xd = np.concatenate((expanded_Xd, orig_images), axis=0)
expanded_yd = np.concatenate((expanded_yd, new_labels), axis=0)
if shuffle:
from random import shuffle
ind_list = [i for i in range(1, len(expanded_Xd))]
shuffle(ind_list)
return expanded_Xd[ind_list], expanded_yd[ind_list]
else:
return expanded_Xd[1:], expanded_yd[1:]
expanded_X_train, expanded_y_train = augment_dataset(X_train, y_train, 2, show=True, shuffle=True)
After plotting the distributions before and after augmentation, distribution of images among classes is more balanced. No class is under-sampled.
plt.figure(figsize=(15, 5))
plt.subplot(121), plt.plot(np.bincount(y_train)), plt.title('Before Augmentation')
plt.subplot(122), plt.plot(np.bincount(expanded_y_train)), plt.title('After Augmentation')
plt.show()
plt.figure(figsize=(15, 5))
plt.subplot(121), plt.hist(y_train, bins=n_classes), plt.title('Before Augmentation')
plt.subplot(122), plt.hist(expanded_y_train, bins=n_classes), plt.title('After Augmentation')
plt.show()
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import copy
import torch.utils.data as data_utils
Then, using torch.utils.data.TensorDataset, I converted numpy training/validation/test image matrices into tensors.
Using torch.utils.data.TensorDataset, I sampled batches of 64 images from the traffic_train and traffic_val tensors.
traffic_train = data_utils.TensorDataset(torch.from_numpy(np.transpose(expanded_X_train/255.0, (0,3,1,2))), torch.from_numpy(expanded_y_train))
loader_train = data_utils.DataLoader(traffic_train, batch_size=64, shuffle=True)
traffic_val = data_utils.TensorDataset(torch.from_numpy(np.transpose(X_valid/255.0, (0,3,1,2))), torch.from_numpy(y_valid))
loader_val = data_utils.DataLoader(traffic_val, batch_size=64, shuffle=True)
traffic_test = data_utils.TensorDataset(torch.from_numpy(np.transpose(X_test/255.0, (0,3,1,2))), torch.from_numpy(y_test))
loader_test = data_utils.DataLoader(traffic_test, batch_size=64, shuffle=True)
Helper functions to flatten a multi-dimensional Tensors to 2-D Tensors ready to be fit in Softmax
use_gpu = torch.cuda.is_available()
print_every = 100
def reset(m):
if hasattr(m, 'reset_parameters'):
m.reset_parameters()
class Flatten(nn.Module):
def forward(self, x):
N, C, H, W = x.size() # read in N, C, H, W
# print(N, C, H, W)
return x.view(N, -1) # "flatten" the C * H * W values into a single vector per image
I built a simple LeNet structure using nn.Sequential. To check the final output dimensions, I feed in a random Torch tensors of (64, 3, 32, 32) to get an output Tensor of (64, 43) which is the scores for 43 classes of 64 input Tensors.
lenet = nn.Sequential(
nn.Conv2d(3, 6, kernel_size=5),
nn.ReLU(inplace=True),
nn.Conv2d(6, 16, kernel_size=5),
nn.ReLU(inplace=True),
nn.BatchNorm2d(16),
nn.MaxPool2d(2, stride=2),
nn.Dropout(p=0.5),
Flatten(),
nn.Linear(16*12*12, 120),
nn.ReLU(inplace=True),
nn.Linear(120, 84),
nn.ReLU(inplace=True),
nn.Linear(84, n_classes))
lenet(Variable(torch.randn(64, 3, 32, 32))).size()
I wrote a train script to perform training for a certain number of epoches. After each epoch, the script will cycle through the validation set to calculate the accuracy. If it is more than best accuracy so far, the model state is saved.
import time
def train(model, loss_fn, optimizer, num_epochs=1, plot=True, use_gpu=True):
reset(model)
start_time = str(round(time.clock()))
best_accuracy = 0.0
best_model = model
for epoch in range(num_epochs):
print("--------------")
print("Starting epoch %d / %d" % (epoch + 1, num_epochs))
loader = None
epoch_loss = None
epoch_accuracy = None
for phase in ['train', 'val']:
if phase == "train":
model.train() # Put model in training mode
loader = loader_train
else:
model.eval() # Put model in evaluation mode
loader = loader_val
running_loss = 0.0
running_corrects = 0
losses = []
accuracies = []
num_samples = 0
for t, (x, y) in enumerate(loader):
if use_gpu:
x_var = Variable(x.float().cuda())
y_var = Variable(y.long().cuda())
else:
x_var = Variable(x.float())
y_var = Variable(y.long())
scores = model(x_var)
_, preds = torch.max(scores.data, 1)
loss = loss_fn(scores, y_var)
batch_size = preds.size(0)
batch_loss = loss.data[0]
batch_corrects = torch.sum(preds == y_var.data)
batch_acc = batch_corrects / batch_size
losses.append(batch_loss)
accuracies.append(batch_acc)
running_loss += batch_loss * batch_size
running_corrects += batch_corrects
num_samples += batch_size
# Don't print on validation.
if phase == "train" and (t + 1) % print_every == 0:
print('t = %d, loss = %.4f, acc = %.4f' % (t + 1, batch_loss, batch_acc))
# Only perform training steps on training.
if phase == "train":
optimizer.zero_grad()
loss.backward()
optimizer.step()
if phase == "train" and plot:
fig, ax = plt.subplots(figsize=(10, 5))
plt.grid(True)
line1, = ax.plot(losses, label='Losses')
line2, = ax.plot(accuracies, label='Accuracy')
ax.legend(loc='lower right')
plt.title('Epoch {} Loss'.format(epoch+1))
plt.xlabel('Minibatch Number')
plt.show()
epoch_loss = running_loss / num_samples
epoch_acc = running_corrects / num_samples
print('Phase {}, Epoch {}, Overall Loss: {:.4f} and Accuracy: {:.4f}'.format(phase, epoch+1, epoch_loss, epoch_acc))
# After each epoch, get the average accuracy across all validation batches
# If accuracy is best so far, store the model.
if phase == "val" and epoch_acc > best_accuracy:
best_accuracy = epoch_acc
best_model = copy.deepcopy(model) # Update the best model.
print ("=> Saving a new best")
torch.save({'epoch': epoch + 1,
'state_dict': model.state_dict(),
'best_accuracy': best_accuracy}, "checkpoint.best_of_{}.pth.tar".format(start_time))
return best_model
I trained the LeNet structure with starting lr = 0.001 and momentum = 0.9 for 50 epoches. The best is obtained at Epoch 39, Overall Loss: 0.2473 and Accuracy: 0.9385. At this point, the training is most likely plateaued and requires further tuning.
lenet = lenet.cuda()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(lenet.parameters(), lr=0.001, momentum=0.9, dampening=0, weight_decay=0.0005, nesterov=True)
best_lenet = train(lenet, loss_fn, optimizer, num_epochs=50, plot=False)
The MobileNet is sourced from this Github repo that is intially used for CIFAR10 dataset of similar dimension 32x32.
Due to a more complicated structure, the training took longer to complete.
import torch.nn.functional as F
class Block(nn.Module):
'''Depthwise conv + Pointwise conv'''
def __init__(self, in_planes, out_planes, stride=1):
super(Block, self).__init__()
self.conv1 = nn.Conv2d(in_planes, in_planes, kernel_size=3, stride=stride, padding=1, groups=in_planes, bias=False)
self.bn1 = nn.BatchNorm2d(in_planes)
self.conv2 = nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=1, padding=0, bias=False)
self.bn2 = nn.BatchNorm2d(out_planes)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = F.relu(self.bn2(self.conv2(out)))
return out
class MobileNet(nn.Module):
# (128,2) means conv planes=128, conv stride=2, by default conv stride=1
cfg = [64, (128,2), 128, (256,2), 256, (512,2), 512, 512, 512, 512, 512, (1024,2), 1024]
def __init__(self, num_classes=10):
super(MobileNet, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(32)
self.layers = self._make_layers(in_planes=32)
self.linear = nn.Linear(1024, n_classes)
def _make_layers(self, in_planes):
layers = []
for x in self.cfg:
out_planes = x if isinstance(x, int) else x[0]
stride = 1 if isinstance(x, int) else x[1]
layers.append(Block(in_planes, out_planes, stride))
in_planes = out_planes
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layers(out)
out = F.avg_pool2d(out, 2)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
mobile_net = MobileNet().cuda()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(mobile_net.parameters(), lr=0.1, momentum=0.9, dampening=0, weight_decay=0.0005, nesterov=True)
best_mobile_net = train(mobile_net, loss_fn, optimizer, num_epochs=40)
After finishing the training phase, I feed the test images and labels to evaluate the best-so-far model state.
def check_accuracy(model, loader):
num_correct = 0
num_samples = 0
model.eval()
for x, y in loader:
x_var = Variable(x.float().cuda(), volatile=True)
scores = model(x_var)
_, preds = scores.data.cpu().max(1)
num_correct += (preds == y).sum()
num_samples += preds.size(0)
acc = float(num_correct) / num_samples
print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
Despite having much larger training and validation accuracy, given the amount of training epoches, my model achieves the baseline of 93%. Obviously, this can be improved for larger number of training epoches.
check_accuracy(best_lenet, loader_test)
check_accuracy(best_mobile_net, loader_test)
I downloaded 20 images from the internet. Similar to @eqbal. For every image, I resize it to (32x32) to fit the trained model.
import os
test_images = []
path = "dataset/test_images/"
for image in os.listdir(path):
img = cv2.imread(path + image)
img = cv2.resize(img, (32,32))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
test_images.append(img)
plt.figure(figsize=(15, 20))
for i, image in enumerate(test_images):
grid = plt.subplot(len(test_images)/4,5,i+1)
grid.imshow(image), plt.axis('off')
plt.show()
test_tensors = torch.from_numpy(np.transpose(np.asarray(test_images), (0,3,1,2))).float() / 255.0
def predict_and_visualize(net, test_tensors):
net.eval()
outputs = net(Variable(test_tensors.cuda(), volatile=True))
_, predicted = torch.max(outputs.data.cpu(), 1)
outputclass = predicted.numpy()
plt.figure(figsize=(15,20))
for i, image in enumerate(test_images):
grid = plt.subplot(len(test_images)/4,5,i+1)
grid.imshow(image), plt.axis('off')
plt.title(sign_names[outputclass[i]])
plt.show()
return outputs
For each image, I output the top guess and they turn out pretty okay.
lenet_outputs = predict_and_visualize(best_lenet, test_tensors)
I also output the top-5 "guesses" that have the largest softmax probabilities. These are shown in the histogram below.
probs = F.softmax(lenet_outputs, dim=1)
top_K_values, top_K_indices = torch.topk(probs.data.cpu(), dim=1, k=5)
top_K_values, top_K_indices = top_K_values.numpy(), top_K_indices.numpy()
def plot_probability_per_class(probabilities, classes, image):
""" For each image, input top-k probabilities, classes and visualize them. Source: @eqbal.
"""
mpl_fig = plt.figure(figsize=(8,8))
ax = mpl_fig.add_subplot(121)
ax.set_ylabel('Probability')
ax.set_xlabel('Class ID')
ax.set_title('Top 5 probabilities')
ax.bar(classes, probabilities, 1, color='blue', label='Inputs per class')
top = probabilities.argmax()
class_id = classes[top]
ax = mpl_fig.add_subplot(122)
ax.set_xticks([])
ax.set_yticks([])
title = "Predicted: %s" % sign_names[class_id]
ax.set_title(title)
ax.set_xlabel("Probs = {}".format(probabilities[top]))
ax.imshow(image)
plt.show()
for p, c, i in zip(top_K_values, top_K_indices, test_images):
plot_probability_per_class(p, c, i)
I did the same thing for the best MobileNet I got after training for 40 epoches. The results are pretty much the same.
mobilenet_outputs = predict_and_visualize(best_mobile_net, test_tensors)
probs = F.softmax(mobilenet_outputs, dim=1)
top_K_values, top_K_indices = torch.topk(probs.data.cpu(), dim=1, k=5)
top_K_values, top_K_indices = top_K_values.numpy(), top_K_indices.numpy()
def plot_probability_per_class(probabilities, classes, image):
""" For each image, input top-k probabilities, classes and visualize them. Source: @eqbal.
"""
mpl_fig = plt.figure(figsize=(8,8))
ax = mpl_fig.add_subplot(121)
ax.set_ylabel('Probability')
ax.set_xlabel('Class ID')
ax.set_title('Top 5 probabilities')
ax.bar(classes, probabilities, 1, color='blue', label='Inputs per class')
top = probabilities.argmax()
class_id = classes[top]
ax = mpl_fig.add_subplot(122)
ax.set_xticks([])
ax.set_yticks([])
title = "Predicted: %s" % sign_names[class_id]
ax.set_title(title)
ax.set_xlabel("Probs = {}".format(probabilities[top]))
ax.imshow(image)
plt.show()
for p, c, i in zip(top_K_values, top_K_indices, test_images):
plot_probability_per_class(p, c, i)